"""
Implements the `~dnadna.snp_sample.SNPSample` class, a generic container for
DNADNA's SNP data, consisting of the SNP matrix itself and the SNP positions
array. It includes built-in methods for reading SNP data from different file
formats, as well as writing it out to different formats.
* Classes for reading and writing `SNPSample` objects to/from different file
formats and data representations. These are generally not used directly, but
rather through methods on the `SNPSample` class itself using the
``SNPSample.to/from_<format>`` methods. The available formats can be listed
like:
>>> from dnadna.snp_sample import SNPSample
>>> SNPSample.converter_formats
['dict', 'npz']
* `DictSNPConverter` - converts `SNPSample` to/from a JSON-serializable
`dict`-based format.
* `NpzSNPConverter` - serializes and deserializes an `SNPSample` to/from an
NPZ file.
Additional converters can be registered simply by defining subclasses of
`SNPConverter` (make sure the modules the classes are in are actually
imported).
"""
import abc
import os.path as pth
import pathlib
import types
import zipfile
from itertools import chain
import numpy as np
import torch
from .utils.decorators import cached_classproperty, cached_property, classproperty
from .utils.misc import flatten, indent
from .utils.serializers import GenericSerializer
from .utils.tensor import to_tensor
class _SNPSampleMeta(type):
"""Metaclass for `SNPSample` implementing some automatic functionality."""
def __getattr__(cls, attr):
"""
Implements automatic generation of SNPSample.to/from_<format>
methods.
"""
formats = {c.format: c for c in SNPConverter.converters if isinstance(c.format, str)}
if "_" in attr:
method, format = attr.split("_", 1)
if method not in ["to", "from"] or format not in formats:
raise AttributeError(attr)
return getattr(formats[format], "convert_" + method)
raise AttributeError(attr)
@property
def format_methods(cls):
formats = [c.format for c in SNPConverter.converters if isinstance(c.format, str)]
return list(flatten([f"to_{format}", f"from_{format}"] for format in formats))
def __dir__(cls):
return sorted(super().__dir__() + cls.format_methods)
[docs]class SNPSample(metaclass=_SNPSampleMeta):
"""
Class representing a single SNP sample from a population.
Consists of an array of shape ``(n, m)`` where ``n`` is the number of
individuals in the sample and ``m`` is the number of SNPs, along with a 1-D
array of shape ``(m,)`` of SNP positions in the nucleotide.
By default positions are assumed to be normalized to the range ``[0.0,
1.0]`` of absolute positions, but this can be changed with the
``pos_format`` argument (see below).
The SNP and pos arrays can be given in any type that can be easily
converted to a `torch.Tensor`.
Parameters
----------
snp : `list`, `numpy.ndarray`, `torch.Tensor`, optional
The SNP matrix. Must be provided unless a ``loader`` is provided.
pos : `list`, `numpy.ndarray`, `torch.Tensor`, optional
The positions array. Must be provided unless a ``loader`` is provided.
pos_format : `dict`, optional
A `dict` specifying how the positions are formatted. It can currently
contain up to 4 keys (see the ``position_format`` property in the
:ref:`dataset schema <schema-dataset>`). If not specified, the
default assumption is ``{'distance': False, 'circular': False,
'normalized': False}``, though it will be inferred whether or not the
positions are normalized if not otherwise specified.
path : object, optional
The path from which this `SNPSample` was loaded. Typically this will
be a filesystem path as a `str` or `pathlib.Path`, but it may be
anything depending on how the `SNPSample` as loaded. This is included
for informational purposes only.
copy : bool, optional
If `True` the data underlying ``snp`` and ``pos`` arguments are always
copied. If `False` (default) a copy will be avoided *if possible*, but
may still be necessary (e.g. when converting a Python `list` to
`torch.Tensor`, or when the dtype needs to be converted).
loader : `SNPLoader`, optional
If provided, the ``snp`` and/or ``pos`` arguments may be omitted. A
loader allows lazy-loading of SNP matrix data on-demand. See the
documentation for `SNPLoader`.
validate : bool, optional
Validate the formats of the SNP and position tensors. This can be
disabled for efficiency if you are sure they are already in the correct
format. When ``validate=False`` make sure also to supply a correct
``pos_format`` argument (default: True).
Examples
--------
>>> from dnadna.snp_sample import SNPSample
>>> snp = [[1, 0, 0, 1], [0, 1, 1, 0]]
>>> pos = [0.2, 0.4, 0.6, 0.8]
>>> samp = SNPSample(snp, pos)
>>> samp.snp
tensor([[1, 0, 0, 1],
[0, 1, 1, 0]])
>>> samp.pos
tensor([0.2000, 0.4000, 0.6000, 0.8000], dtype=torch.float64)
The SNP and position arrays can be combined into a single array in one of
two formats, ``.product`` which takes the product of the two arrays, with
the position array multiplied along the individuals axis:
>>> samp.product
tensor([[0.2000, 0.0000, 0.0000, 0.8000],
[0.0000, 0.4000, 0.6000, 0.0000]], dtype=torch.float64)
Or the two arrays can be simply concatenated into a ``(n + 1, m)`` array,
with the first row containing the positions and the remaining rows
containing the SNPs:
>>> samp.concat
tensor([[0.2000, 0.4000, 0.6000, 0.8000],
[1.0000, 0.0000, 0.0000, 1.0000],
[0.0000, 1.0000, 1.0000, 0.0000]], dtype=torch.float64)
Or just ``.tensor`` returns one or the other depending on the value of the
``.tensor_format`` attribute:
>>> bool((samp.concat == samp.tensor).all())
True
>>> samp2 = SNPSample(samp.snp, samp.pos, tensor_format='product')
>>> bool((samp2.product == samp2.tensor).all())
True
The optional ``path`` element (`None` by default) can give a data
source-specific path from which the sample was read (typically a
filename):
>>> SNPSample(snp, pos, path='one_event/scenario_000/one_event_000_0.npz')
SNPSample(
snp=tensor([[1, 0, 0, 1],
[0, 1, 1, 0]]),
pos=tensor([0.2000, 0.4000, 0.6000, 0.8000], dtype=torch.float64),
pos_format={'normalized': True},
path='one_event/scenario_000/one_event_000_0.npz'
)
"""
DEFAULT_POS_FORMAT = {"distance": False, "normalized": False, "circular": False}
TENSOR_FORMATS = ("concat", "product")
DEFAULT_TENSOR_FORMAT = "concat"
def __init__(
self,
snp=None,
pos=None,
pos_format=None,
tensor_format=None,
path=None,
copy=False,
loader=None,
validate=True,
):
if loader is None and (snp is None or pos is None):
raise TypeError(
"both the snp and pos arguments must be provided if a loader" "is not provided"
)
if snp is not None:
snp = to_tensor(snp, copy=copy)
if pos is not None:
pos = to_tensor(pos, copy=copy)
if pos.dim() > 1:
# On a 1x1 tensor torch.squeeze() actually makes it
# 0-dimensional which we don't want
pos = pos.squeeze()
if validate and snp is not None and pos is not None:
# Validate them now, otherwise we wait until all the data is loaded
# from the loader
pos_format = self._validate(snp, pos, pos_format)
self._snp = snp
self._pos = pos
self._pos_format = pos_format
self._path = path
self._tensor_format = tensor_format or self.DEFAULT_TENSOR_FORMAT
self._loader = loader
assert (
self._tensor_format in self.TENSOR_FORMATS
), f"tensor_format must be one of {self.TENSOR_FORMATS}"
def __dir__(self):
return sorted(super().__dir__() + self.__class__.format_methods)
@property
def snp(self):
"""The SNP matrix."""
if self._snp is None:
# This will only be called if the snp argument was not provided
# upon initialization
self._snp = self._data[0]
return self._snp
@property
def pos(self):
"""The positions array."""
if self._pos is None:
self._pos = self._data[1]
return self._pos
[docs] @cached_property
def shape(self):
"""The number of SNPs and number of individuals as a tuple."""
if self._snp is not None:
return (self._snp.shape[0], self._snp.shape[1])
else:
return self._loader.get_shape()
@property
def n_snp(self):
"""The number of SNPs in the sample."""
return self.shape[1]
@property
def n_indiv(self):
"""The number of individuals in the sample."""
return self.shape[0]
@property
def tensor(self):
"""
Either `SNPSample.concat` or `SNPSample.product` depending on the value
of `SNPSample.tensor_format`.
"""
return getattr(self, self.tensor_format)
@property
def product(self):
"""
The product of the ``pos`` array with the ``snp`` array.
The result has the same dtype as the ``pos`` array.
"""
return self.snp.to(self.pos.dtype).mul_(self.pos)
@property
def concat(self):
"""
The concatenation of the ``pos`` array with the ``snp`` array.
The result has the same dtype as the ``pos`` array.
"""
return torch.cat((self.pos.unsqueeze(0), self.snp.to(self.pos.dtype)))
@property
def pos_format(self):
"""
A `dict` specifying how the positions are formatted.
It can currently contain up to 4 keys (see the ``position_format``
property in the :ref:`dataset schema <schema-dataset>`). If not
specified, the default assumption is ``{'distance': False, 'circular':
False, 'normalized': False}``, though it will be inferred whether or
not the positions are normalized if not otherwise specified.
"""
if self._pos_format is None:
return self.DEFAULT_POS_FORMAT.copy()
else:
return self._pos_format
@property
def full_pos_format(self):
"""
Return the user-provided ``pos_format`` merged with the default value.
"""
full_pos_format = self.DEFAULT_POS_FORMAT.copy()
full_pos_format.update(self.pos_format)
return full_pos_format
@property
def tensor_format(self):
"""
The default format for `SNPSample.tensor` on this `SNPSample`.
If ``'concat'``, it is equivalent to `SNPSample.concat`, and if
``'product'`` it is equivalent to `SNPSample.product` (default:
``'concat'``).
"""
return self._tensor_format
@property
def path(self):
"""
The path from which this `SNPSample` was loaded.
Typically this will be a filesystem path as a `str` or `pathlib.Path`,
but it may be anything depending on how the `SNPSample` as loaded.
This is included for informational purposes only.
"""
return self._path
@property
def loader(self):
"""The `SNPLoader` used for lazy-loading this `SNPSample`, if any."""
return self._loader
[docs] @classmethod
def from_file(cls, filename_or_obj, **kwargs):
"""
Read an `SNPSample` from a file using one of the known `SNPSerializer`
types. The serialization format will be determined by the filename.
In the case of file-like objects it must have a ``.name`` or
``.filename`` attribute in order to guess the format.
.. todo::
It would be a good idea to use actual file format analysis to
determine the file type instead of just rely on the filename; for
now this is simpler and all that's needed in most cases.
For a usage example, see `SNPSample.to_file`.
"""
return SNPSerializer.load(filename_or_obj, **kwargs)
[docs] def to_file(self, filename_or_obj, **kwargs):
"""
Serialize the `SNPSample` to a file or file-like object.
The appropriate serializer will be determined by the filename, as in
`SNPSample.from_file`.
Examples
--------
>>> import io
>>> from dnadna.snp_sample import SNPSample
>>> out = io.BytesIO()
A filename ending with ``.npz`` indicates the NPZ-based DNADNA format:
>>> out.name = 'out.npz'
>>> snp = SNPSample([[0, 1], [0, 0]], [0.1, 0.2])
>>> snp.to_file(out)
>>> _ = out.seek(0)
>>> snp2 = SNPSample.from_file(out)
>>> snp == snp2
True
"""
return SNPSerializer.save(self, filename_or_obj, **kwargs)
@classproperty
def converter_formats(cls):
"""
List the names of all converter formats available for `SNPSample`.
For each format in this list, there is are associated
``SNPSample.to_<format>`` and ``SNPSample.from_<format>`` methods
available (where the latter is a `classmethod`).
Examples
--------
>>> from dnadna.snp_sample import SNPSample
>>> SNPSample.converter_formats
['dict', 'npz']
>>> SNPSample.from_dict
<bound method DictSNPConverter.from_dict of
<class 'dnadna.snp_sample.DictSNPConverter'>>
>>> snp = SNPSample([[1, 0], [0, 1]], [0, 1])
<bound method DictSNPConverter.to_dict of SNPSample(
snp=tensor([[1, 0, 1],
[0, 1, 0],
[0, 1, 0]], dtype=torch.uint8),
pos=tensor([1, 2, 3],
tensor_format='concat')
)>
As you can see in the above examples, the converter methods are
actually defined on the `DictSNPConverter` class, but they are made
available directly as methods on `SNPSample`.
See also `dir` of `SNPSample` for a list of methods:
>>> dir(SNPSample)
[...from_dict, from_npz, ..., to_dict, to_npz...]
"""
return SNPConverter.formats
@cached_property
def _data(self):
"""
Returns the SNP matrix and pos arrays from the loader if they were not
already set.
"""
snp, pos = self._loader.get_data()
self._pos_format = self._validate(snp, pos, self._pos_format)
return snp, pos
def __getattr__(self, attr):
"""
Implement lookup of the ``from/to_<format>`` methods on `SNPSample`
instances.
This just goes directly through the metaclass for this, but we also
have to implement it at the instance level. For plain functions we
must still bind them to the instance.
Tests
-----
Regression test: Ensure that if a property which is defined on the
class bubbles up an `AttributeError`, then an `AttributeError` is still
raised for that property as opposed to a more obscure error.
>>> from dnadna.snp_sample import SNPSample
>>> class BrokenSNPSample(SNPSample):
... @property
... def snp(self):
... return self._does_not_exist
...
>>> s = BrokenSNPSample([[0, 1], [1, 0]], [0, 1])
>>> s.snp
Traceback (most recent call last):
...
AttributeError: snp. Did you mean: '_snp'?
"""
method = type(self.__class__).__getattr__(self.__class__, attr)
if not hasattr(method, "__self__"):
# It is not yet a bound method
method = types.MethodType(method, self)
return method
def __repr__(self):
cls_name = self.__class__.__name__
# NOTE: Ensure that self.snp and self.pos are loaded before
# formatting pos_format, since self.pos_format can be set when
# lazy-loading self.snp or self.pos
snp = indent(repr(self.snp), 8, first=False)
pos = indent(repr(self.pos), 8, first=False)
pos_format = (
f",\n pos_format={self.pos_format}"
if self.full_pos_format != self.DEFAULT_POS_FORMAT
else ""
)
tensor_format = (
f",\n tensor_format={self.tensor_format}"
if self.tensor_format != self.DEFAULT_TENSOR_FORMAT
else ""
)
path = f",\n path={self.path!r}" if self.path is not None else ""
return (
f"{cls_name}(\n"
f" snp={snp},\n"
f" pos={pos}"
f"{pos_format}{tensor_format}{path}\n)"
)
def __eq__(self, other):
"""
Two `SNPSample`s are equal if their SNP and position arrays are equal.
This does not take into account other possible isomorphisms.
Examples
--------
>>> from dnadna.snp_sample import SNPSample
>>> sample1 = SNPSample([[1, 0], [0, 1]], [0.2, 0.4])
>>> sample2 = SNPSample([[1, 1], [1, 0]], [0.3, 0.5])
>>> sample1 == sample1
True
>>> sample1 == sample2
False
"""
if not isinstance(other, SNPSample):
return False
return bool((self.pos == other.pos).all() and (self.snp == other.snp).all())
[docs] def copy(self):
"""
Creates a copy of this `SNPSample`, including copying the ``snp`` and
``pos`` tensors.
"""
return self.copy_with(copy=True)
[docs] def copy_with(
self,
snp=None,
pos=None,
pos_format=None,
tensor_format=None,
path=None,
copy=False,
validate=None,
):
"""
Creates a copy of this `SNPSample` instance with any of the fields
replaced.
If ``copy=True`` the storage for the ``snp`` and ``pos`` tensors is
also copied; otherwise the same storage is referenced in the new
`SNPSample`.
"""
# Only re-validate if any of the SNP/pos matrices have changed or the
# position format has changed
if validate is None:
validate = snp is not None or pos is not None or pos_format is not None
return self.__class__(
snp if snp is not None else self.snp,
pos if pos is not None else self.pos,
pos_format if pos_format is not None else self.pos_format.copy(),
tensor_format if tensor_format is not None else self.tensor_format,
path if path is not None else self.path,
copy=copy,
validate=validate,
)
def _validate(self, snp, pos, pos_format=None):
"""
Determine the position format and check that it's consistent with
the given positions array.
If the ``'normalized'`` keyword is not otherwise specified we can
infer it by the data (i.e. if all values are < 1).
Examples
--------
Several possible errors can be raised if the positions are not in a
consistent format. By default, positions are assumed to be
non-normalized, non-circular absolute positions:
>>> from dnadna.snp_sample import SNPSample
>>> sample = SNPSample([[0, 1, 0], [1, 0, 1]], [1, 2, 3])
>>> sample
SNPSample(
snp=tensor([[0, 1, 0],
[1, 0, 1]]),
pos=tensor([1, 2, 3])
)
If the positions are not in the default format this is displayed
explicitly:
>>> sample = SNPSample([[0, 1, 0], [1, 0, 1]], [0.1, 0.2, 0.3],
... pos_format={'distance': True,
... 'normalized': True})
>>> sample
SNPSample(
snp=tensor([[0, 1, 0],
[1, 0, 1]]),
pos=tensor([0.1000, 0.2000, 0.3000], dtype=torch.float64),
pos_format={'distance': True, 'normalized': True}
)
If the positions are normalized this will be inferred:
>>> sample = SNPSample([[0, 1, 0], [1, 0, 1]], [0.1, 0.2, 0.3])
>>> sample
SNPSample(
snp=tensor([[0, 1, 0],
[1, 0, 1]]),
pos=tensor([0.1000, 0.2000, 0.3000], dtype=torch.float64),
pos_format={'normalized': True}
)
If we specify ``normalized=False`` but the positions are clearly
normalized (or vice-versa) this will result in a `ValueError`:
>>> sample = SNPSample([[0, 1, 0], [1, 0, 1]], [0.1, 0.2, 0.3],
... pos_format={'normalized': False})
Traceback (most recent call last):
...
ValueError: specified normalized=False in pos_format even though
the positions appear to be normalized
"""
assert snp.dim() == 2, (
"snp must by a 2-D array of shape (n, m) where n is the "
"number of individuals in the sample and m the number of "
"SNPs"
)
assert pos.shape[0] == snp.shape[1], (
f"the size of the pos array ({pos.shape[0]}) must be the "
f"same as the number of SNPs in the snp array "
f"({snp.shape[1]})"
)
if pos_format is None:
pos_format = {}
normalized = bool((pos <= 1.0).all())
if "normalized" in pos_format:
if pos_format["normalized"] and not normalized:
raise ValueError(
"specified normalized=True in pos_format even though the "
"positions appear to not be normalized"
)
elif not pos_format["normalized"] and normalized:
raise ValueError(
"specified normalized=False in pos_format even though the "
"positions appear to be normalized"
)
else:
# Set this in case it was not otherwise specified
pos_format["normalized"] = normalized
if (pos < 0).any():
raise ValueError("positions/distances must be non-negative")
return pos_format
[docs]class SNPLoader(metaclass=abc.ABCMeta):
"""
Base class for SNP loaders.
A loader is used for lazy-loading of SNP data. While the `SNPConverter`
classes are converting `SNPSample` objects to/from different formats (e.g.
different file formats), a *loader* simply provides methods for getting the
SNP matrix and position array data on-demand.
An `SNPLoader` must at minimum implement the `SNPLoader.get_data` method
which returns a tuple of `torch.Tensor` objects for the SNP matrix and
position arrays respectively.
It may optionally implement an `SNPLoader.get_shape` which returns a tuple
``(n_indiv, n_snp)``--the number of SNPs and the number of individuals in
the sample. This can be used as an optimization to get the dimensions of a
sample without loading the full data.
"""
[docs] @abc.abstractmethod
def get_data(self):
"""
Returns the SNP matrix and position array of an SNP sample as a tuple
of `torch.Tensor`.
Must be implemented by subclasses.
"""
[docs] def get_shape(self):
"""
Returns the dimensions of an `SNPSample` as a tuple of ``(n_indiv,
n_snp)``.
The default implementation simply calls `SNPLoader.get_data` and
returns the dimensions of the tensors. However, this may be overridden
by subclasses to provide a more efficient implementation, e.g. that
does not require loading the full data if there is metadata available
to provide this information.
"""
snp, _ = self.get_data()
return tuple(snp.shape)
[docs]class SNPConverter(metaclass=abc.ABCMeta):
"""
Base class for converters between `SNPSample` and other objects
representing SNPs.
Similar interface to `~dnadna.utils.serializers.GenericSerializer`
except the inputs and outputs need not be files. In the case
of `SNPSerializer` they are files, but see `DictSNPConverter` for
a counter-example.
"""
@abc.abstractproperty
def format(self):
"""
Name of the format this implements (which may be different from the
filename extension(s). This is used to generate ``to/from_<format>``
methods on `SNPSample`.
"""
[docs] @abc.abstractclassmethod
def convert_from(cls, obj, *args, **kwargs):
"""Convert the given object to an `SNPSample`."""
[docs] @abc.abstractmethod
def convert_to(self, *args, **kwargs):
"""
Convert the given `SNPSample` to the desired output
type.
.. note::
The way these classes are used is such that they are never
instantiated, but are instead containers for methods on the
`SNPSample` class itself (see ``_SNPSampleMeta`` in the source
code).
This is because when the ``.convert_to()`` method is called,
``self`` is not an instance of an `SNPConverter`, but rather it is
an instance of `SNPSample`.
"""
def __init_subclass__(cls):
"""
Every time a new `SNPConverter` subclass comes online we must
invalidate the cache for `SNPConverter.converters`.
"""
SNPConverter.__dict__["converters"].invalidate(SNPConverter)
@cached_classproperty
def converters(cls):
"""
List of all converter classes.
This is cached to speed up in the future, but it relies on recursively
evaluating all its ``__subclasses__()``. Therefore if any new
subclasses are defined we need to invalidate the cache each time (see
``SNPConverter.__init_subclass__``).
Examples
--------
Test that this invalidation actually occurs when defining a new
subclass:
>>> from dnadna.snp_sample import SNPConverter
>>> SNPConverter.formats
['dict', 'npz']
>>> class MyConverter(SNPConverter):
... # note: it's not strictly necessary to define the to/from
... #methods
... format = 'my_format'
>>> SNPConverter.formats
['dict', 'my_format', 'npz']
There is, however, no way to "unregister" formats under this mechanism,
but in practice that would be rare. We just have to delete the
subclass and then manually perform the cache invalidation e.g. by
manually calling ``__init_subclass__`` in order to clean up:
>>> n_subclasses = len(SNPConverter.__subclasses__())
>>> del MyConverter
>>> SNPConverter.__init_subclass__()
Note: It's not enough just to ``del MyConverter``. Apparently
``type.__subclasses__`` can still holds on to weak references (possibly
as a `weakref.WeakSet`?) so there is a risk of resurrecting the deleted
class if we try to rebuild the cache. Run a few rounds of garbage
collection to really make sure it's gone:
>>> import gc
>>> while len(SNPConverter.__subclasses__()) > n_subclasses - 1:
... _ = gc.collect()
>>> SNPConverter.formats
['dict', 'npz']
"""
# Note: type.__subclasses__() only returns immediate subclasses, but we
# want to be able to check over all derived subclasses of SNPConverter
def subclasses_recursive(cls):
return chain(
[cls], flatten(subclasses_recursive(subcls) for subcls in cls.__subclasses__())
)
return list(subclasses_recursive(cls))
def _formats(cls):
"""
Returns just the format names of all registered non-abstract
converters.
Examples
--------
>>> from dnadna.snp_sample import SNPConverter
>>> SNPConverter.formats
['dict', 'npz']
"""
return sorted(c.format for c in cls.converters if isinstance(c.format, str))
# NOTE: This is implemented without using a decorator, since otherwise
# pytest will not locate the doctest.
formats = classproperty(_formats)
[docs]class DictSNPConverter(SNPConverter, SNPLoader):
"""
Converts `SNPSamples <SNPSample>` to/from a
JSON-compatible dict format.
Also acts as an `SNPLoader` for lazy-loading when
`DictSNPConverter.from_dict` is passed ``lazy=True`` (the default).
See `DictSNPConverter.convert_to` for a description of the data format.
"""
format = "dict"
def __init__(self, data, keys=("SNP", "POS")):
self.data = data
self.keys = keys
[docs] def get_data(self):
return self._from_dict(self.data, self.keys)
[docs] @classmethod
def from_dict(cls, data, keys=("SNP", "POS"), pos_format=None, path=None, lazy=True):
"""
Convert a JSON-compatible data structure to an `SNPSample`.
See `DictSNPConverter.convert_to` for a description of the data format.
Examples
--------
>>> from dnadna.snp_sample import SNPSample
>>> import numpy as np
Random SNP and position arrays:
>>> snp = (np.random.random((10, 10)) >= 0.5).astype('uint8')
>>> pos = np.sort(np.random.random(10))
>>> sample = SNPSample(snp, pos)
>>> sample2 = SNPSample.from_dict(sample.to_dict())
>>> sample == sample2
True
"""
if lazy:
return SNPSample(pos_format=pos_format, path=path, loader=cls(data, keys))
snp, pos = cls._from_dict(data, keys)
return SNPSample(snp=snp, pos=pos, pos_format=pos_format, path=path)
[docs] def to_dict(self, keys=("SNP", "POS")):
"""
Convert the `SNPSample` to a JSON-compatible representation.
This format is similar to the NPZ format in that the SNP matrix and
position arrays are output to properties given by the ``keys``
argument, which defaults to ``('SNP', 'POS')``.
The position array is written as a JSON array of floats. The SNP
matrix is written in a compact representation consisting of an array of
SNPs, with each SNP represented as a *string* of ``1`` s and ``0`` s.
Examples
--------
>>> from dnadna.snp_sample import SNPSample
>>> snp = [[1, 0, 1], [0, 1, 0], [1, 1, 0]]
>>> pos = np.array([0.1, 0.2, 0.3], dtype=np.float64)
>>> sample = SNPSample(snp, pos)
>>> sample.to_dict()
{'SNP': ['101', '010', '110'], 'POS': [0.1, 0.2, 0.3]}
"""
snp_bytes = self.snp.numpy().astype("S1")
return {
keys[0]: [s.tobytes().decode("ascii") for s in snp_bytes],
keys[1]: self.pos.tolist(),
}
@staticmethod
def _from_dict(data, keys=("SNP, POS")):
"""Convert the dict structure to `torch.Tensor`."""
# Convert the SNP matrix to a NumPy array first, since NumPy will
# convert strings containing digits to numbers
snp = np.array([list(s) for s in data[keys[0]]], dtype=np.uint8)
snp = to_tensor(snp, dtype=torch.uint8)
# Assume maximum available precision; if need be we can add an option
# later to specify dtype/precision for the positions.
pos = to_tensor(data[keys[1]], dtype=torch.float64)
return snp, pos
# Note: We do this so that the actual methods have more the more
# descriptive names from_dict and to_dict, otherwise the only good way to
# make a Python function with a different name/signature is to mess around
# with exec or code types.
#
# This pattern should be repeated in other SNPConverters (though it is not
# required).
convert_from = from_dict
convert_to = to_dict
[docs]class SNPSerializer(GenericSerializer):
"""Base class for `SNPSample` serializers."""
types = set([SNPSample])
[docs]class NpzSNPConverter(SNPSerializer, SNPConverter, SNPLoader):
"""
Serialize `SNPSamples <SNPSample>` to/from NPZ_ files.
Provides ``SNPSample.to/from_npz`` methods.
Also acts as an `SNPLoader` for lazy-loading when
`NpzSNPConverter.from_npz` is passed ``lazy=True`` (the default).
"""
format = "npz"
extensions = [".npz"]
binary = True
def __init__(self, filename, keys=("SNP", "POS")):
self.filename = filename
self.keys = keys
[docs] def get_data(self):
return self._from_npz(self.filename, keys=self.keys)
[docs] def get_shape(self):
"""
For NPZ files it is possible to get the array shapes by reading the
metadata without extracting the entire array.
It should be sufficient to find just the metadata for the SNP matrix.
Examples
--------
>>> from dnadna.snp_sample import SNPSample, NpzSNPConverter
>>> import io
>>> out = io.BytesIO()
>>> snp = SNPSample([[1, 0], [0, 1], [1, 1]], [2, 3])
>>> snp.to_npz(out)
>>> out.seek(0)
0
>>> conv = NpzSNPConverter(out)
>>> conv.get_shape()
(3, 2)
"""
snp_filename = self.keys[0] + ".npy"
with zipfile.ZipFile(self.filename) as archive:
with archive.open(snp_filename) as fobj:
version = np.lib.format.read_magic(fobj)
version = "_".join(str(v) for v in version)
func_name = f"read_array_header_{version}"
func = getattr(np.lib.format, func_name)
shape, _, _ = func(fobj)
return tuple(shape)
[docs] @classmethod
def load(cls, filename_or_obj, keys=("SNP", "POS"), pos_format=None, lazy=True):
"""
Implements the `~dnadna.utils.serializers.GenericSerializer`
interface for loading data from an NPZ file.
"""
filename = cls.to_filename(filename_or_obj)
if isinstance(filename, (str, pathlib.Path)) and pth.exists(filename):
# If a filename can be extracted from filename_or_obj, then use
# that filename as the path to the SNP
# Otherwise it might be some file-like object (e.g. an io.BytesIO)
# that does not have a filename
filename = pth.abspath(filename)
path = pathlib.Path(filename)
loader = cls(filename, keys)
else:
path = None
loader = cls(filename_or_obj, keys)
if lazy:
return SNPSample(pos_format=pos_format, path=path, loader=loader)
snp, pos = cls._from_npz(filename, keys)
return SNPSample(
snp=snp, pos=pos, pos_format=pos_format, path=pathlib.Path(cls.to_filename(filename))
)
[docs] @classmethod
def save(cls, obj, filename, keys=("SNP", "POS"), compressed=True):
"""
Implements the `~dnadna.utils.serializers.GenericSerializer`
interface for saving data to an NPZ file.
"""
savez = np.savez_compressed if compressed else np.savez
data = {keys[0]: obj.snp.numpy(), keys[1]: obj.pos.numpy()}
savez(filename, **data)
[docs] @classmethod
def from_npz(cls, filename, keys=("SNP", "POS"), pos_format=None, lazy=True):
"""
Read a `SNPSample` from a NumPy NPZ_ file.
An NPZ file can contain multiple arrays, each keyed by an array name.
For SNP samples it is assumed that a given NPZ file contains at least a
SNP matrix array and a position array. The argument keys (default
``('SNP', 'POS')``) should be a 2-tuple giving the array names to look
for in the SNP file for the SNP matrix and the positions respectively.
Examples
--------
>>> import numpy as np
>>> import tempfile
>>> import os
>>> from dnadna.snp_sample import SNPSample
Random SNP and position arrays:
>>> tmp = tempfile.mkdtemp()
>>> file_path = os.path.join(tmp, 'test.npz')
>>> snp = (np.random.random((10, 10)) >= 0.5).astype('uint8')
>>> pos = np.sort(np.random.random(10))
>>> np.savez(file_path, SNP=snp, POS=pos)
>>> sample = SNPSample.from_npz(file_path)
>>> bool((sample.snp.numpy() == snp).all())
True
>>> bool((sample.pos.numpy() == pos).all())
True
.. _NPZ: https://numpy.org/devdocs/reference/generated/numpy.savez.html#numpy.savez
"""
return cls.load(filename, keys=keys, pos_format=pos_format, lazy=lazy)
[docs] def to_npz(self, filename, keys=("SNP", "POS"), compressed=True):
"""
Write a `SNPSample` to a NumPy NPZ_ file.
An NPZ file can contain multiple arrays, each keyed by an array name.
See also `NpzSNPConverter.load` for the converse. The ``keys=('SNP',
'POS')`` argument can be overridden to save with different names for
the SNP and position arrays.
If ``compressed=True`` (default) the NPZ archive is written with zip
compression.
.. _NPZ: https://numpy.org/devdocs/reference/generated/numpy.savez.html#numpy.savez
Examples
--------
>>> import numpy as np
>>> import tempfile
>>> import os
>>> from dnadna.snp_sample import SNPSample
Random SNP and position arrays:
>>> tmp = tempfile.mkdtemp()
>>> file_path = os.path.join(tmp, 'test.npz')
>>> snp = (np.random.random((10, 10)) >= 0.5).astype('uint8')
>>> pos = np.sort(np.random.random(10))
>>> sample = SNPSample(snp, pos)
>>> sample.to_npz(file_path)
>>> sample == SNPSample.from_npz(file_path)
True
"""
# Because of how convert_to works, `self` here is an SNPSample
# instance, not an NpzSNPConverter instance, so we must use the class
# explicitly for the .save() method
return NpzSNPConverter.save(self, filename, keys=keys, compressed=compressed)
convert_from = from_npz
convert_to = to_npz
@staticmethod
def _from_npz(filename, keys=("SNP", "POS")):
"""Load the raw snp and pos tensors."""
with np.load(filename) as npz_data:
snp = to_tensor(npz_data[keys[0]])
pos = to_tensor(npz_data[keys[1]])
return snp, pos